package gokafka

import (
	"context"

	"github.com/Shopify/sarama"
	log "github.com/sirupsen/logrus"
)

type ComsumerHandler interface {
	Handle(ctx context.Context, topic string, messages []byte) error
}

type ComsumerGroupHandler struct {
	Handler ComsumerHandler
}

func (h ComsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	//return h.Handler.Init(session.Context())
	log.Println("Set Up!")
	return nil
}

func (h ComsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	//return h.Handler.Uinit(session.Context())
	log.Println("Clean Up!")
	return nil
}

func (h ComsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	session.MemberID()
	for msg := range claim.Messages() {
		err := h.Handler.Handle(session.Context(), msg.Topic, msg.Value)
		if err != nil {
			log.Error("ConsumeClaim error:", err)
		}
		session.MarkMessage(msg, "")
	}
	return nil
}
